windows下kafka安装启动以及使用 您所在的位置:网站首页 windows 安装kafaka windows下kafka安装启动以及使用

windows下kafka安装启动以及使用

#windows下kafka安装启动以及使用| 来源: 网络整理| 查看: 265

Zookeeper

kafka用到了zookeeper,现在的kafka会自带zookeeper,如果我们要自己安装的话 也可以

zookeeper的下载地址为:https://zookeeper.apache.org/releases.html#download

下载成功之后解压,j进入bin目录,然后通过下面的命令启动zookeeper.

zkServer.cmd

在zookeeper使用时,可能会遇到一些错误,关于这些错误的问题可以参考博主的这篇博客:zookeeper安装启动的一些问题

Kafka

然后我们需要需要下载kafka,kafka的下载地址为:http://kafka.apache.org/downloads

解压并进入Kafka目录,笔者:D:\Kafka\kafka_2.12-0.11.0.0

 进入config目录找到文件server.properties并打开

找到并编辑zookeeper.connect=localhost:2181

Kafka会按照默认,在9092端口上运行,并连接zookeeper的默认端口:2181

 进入Kafka安装目录D:\Kafka\kafka_2.12-0.11.0.0,按下Shift+右键,选择“打开命令窗口”选项,打开命令行,输入:

bin\windows\kafka-server-start.bat config\server.properties

kafka有自带的zookeeper,启动自带的zookeeper可以通过如下命令:

bin\windows\zookeeper-server-start.bat config\zookeeper.properties

如果在执行该命令时报错:

The syntax of the command is incorrect.

则有可能是kafka安装路径中的横杠导致的,这时候我们将kafka拷贝到一个没有横杠的路径下即可。参考:https://stackoverflow.com/questions/20105735/error-the-syntax-of-the-command-is-incorrect-when-renaming-a-file

如果安装没有问题了,使用命令启动单点集群的kafka时,报错:

Connection to node 0 (host.docker.internal/10.130.228.45:9092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient

博主解决的办法是打开,在kafka安装路径下的config/server.properties文件中,修改下面的配置为:

listeners=PLAINTEXT://localhost:9092  

参考:https://stackoverflow.com/questions/47677549/kafka-zookeeper-connection-to-node-1-could-not-be-established-broker-may-no

创建一个topic

命令为:

bin\windows\kafka-topics.bat --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test

测试该topic是否创建成功:

bin\windows\kafka-topics.sh --list --bootstrap-server localhost:9092 发送message > bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test This is a message This is another message 接收message > bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning This is a message This is another message 启动kafka集群

在本地拷贝两个server.properties

> copy config\server.properties config\server-1.properties > copy config\server.properties config/server-2.properties

更改下面的参数

config/server-1.properties: broker.id=1 listeners=PLAINTEXT://XXX:9093 log.dirs=/tmp/kafka-logs-1 config/server-2.properties: broker.id=2 listeners=PLAINTEXT://XXX:9094 log.dirs=/tmp/kafka-logs-2

broker.id属性是集群中每个节点的唯一且永久的名称。 我们只需要覆盖端口和日志目录,这是因为我们都在同一台计算机上运行它们,并且希望所有代理都不要试图在同一端口上注册或覆盖彼此的数据。

我们有Zookeeper并启动了单个节点,因此我们只需要启动两个新节点:

> bin\kafka-server-start.bat config\server-1.properties ... > bin\kafka-server-start.bat config\server-2.properties ...

现在,创建一个具有三个复制factor的新topic:

bin\windows\kafka-topics.bat --create --bootstrap-server localhost:9092 --replication-factor 3 --partitions 1 --topic my-replicated-topic

好的,但是现在有了集群,我们如何知道哪个broker在做什么?我们需要运行“describe topics”命令:

bin\windows\kafka-topics.bat --describe --bootstrap-server localhost:9092 --topic my-replicated-topic Topic: my-replicated-topic PartitionCount: 1 ReplicationFactor: 3 Configs: segment.bytes=1073741824 Topic: my-replicated-topic Partition: 0 Leader: 2 Replicas: 2,1,0 Isr: 2,1,0

 第一行给出了所有分区的摘要,每一行都给出了有关一个分区的信息。 由于该主题只有一个分区,因此只有一行。

“leader”是负责给定分区的所有读取和写入的节点。 每个节点将成为分区的随机选择部分的领导者。“replics”是为该分区复制日志的节点列表,无论它们是引导者还是当前处于活动状态。“ isr”是“同步”副本的集合。 这是副本列表的子集,当前仍处于活动状态并追随领导者。

请注意,在我的示例中,节点2是topic唯一分区的leader。我们可以在创建的之前的topic 上运行相同的命令,以查看其信息

kafka-topics.bat --describe --bootstrap-server localhost:9092 --topic test Topic: test PartitionCount: 1 ReplicationFactor: 1 Configs: segment.bytes=1073741824 Topic: test Partition: 0 Leader: 0 Replicas: 0 Isr: 0

因此,如我们期待的一样-之前的topic没有replicas 副本,并且位于服务器0上,这是我们创建群集时集群中唯一的服务器。

让我们向我们的新topic发布一些消息:

kafka-console-producer.bat --bootstrap-server localhost:9092 --topic my-replicated-topic >my test message 1 >my test message 2 >Terminate batch job (Y/N)? Y

然后consume这些message

kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic my-replicated-topic --from-beginning my test message 1 my test message 2 Processed a total of 2 messages Terminate batch job (Y/N)? Y

现在让我们测试一下容错能力。 broker 2扮演的是leader的角色,所以让我们kill它:

wmic process where "caption = 'java.exe' and commandline like '%server-2.properties%'" get processid ProcessId 8760 C:\Users\tools\kafka>taskkill /pid 8760 /f SUCCESS: The process with PID 8760 has been terminated.

在kill之后,leader自动变为了server 1

kafka-topics.bat --describe --bootstrap-server localhost:9092 --topic my-replicated-topic Topic: my-replicated-topic PartitionCount: 1 ReplicationFactor: 3 Configs: segment.bytes=1073741824 Topic: my-replicated-topic Partition: 0 Leader: 1 Replicas: 2,1,0 Isr: 1,0

但是,即使最初进行写操作的leader已经down掉了,message仍然可供使用:

kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic my-replicated-topic --from-beginning my test message 1 my test message 2

原因在于,我们设置了备份。

 

 



【本文地址】

公司简介

联系我们

今日新闻

    推荐新闻

      专题文章
        CopyRight 2018-2019 实验室设备网 版权所有